##import every libraries needed in the notebook
import pickle
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gs
from scipy import ndimage
import random
import cv2
import gc
import glob
from skimage import transform as trans
from skimage import img_as_ubyte
from skimage import restoration
from PIL import ImageEnhance
from PIL import Image
from tqdm import tqdm
from sklearn.utils import shuffle
import warnings
import glob
import textwrap
warnings.filterwarnings("ignore")
# Useful directories
training_file = 'data/train.p'
validation_file= 'data/valid.p'
testing_file = 'data/test.p'
images_dir = 'images_report/'
# loading files
with open(training_file, mode='rb') as f:
train = pickle.load(f)
with open(validation_file, mode='rb') as f:
valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
test = pickle.load(f)
# loading x and y data from every set
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']
#Number of training examples
n_train = X_train.shape[0]
# Number of testing examples.
n_test = X_test.shape[0]
# Number of validation examples.
n_valid = X_valid.shape[0]
#shape of an traffic sign image.
image_shape = X_train.shape[1:]
#How many unique classes/labels there are in the dataset.
signnames = pd.read_csv('signnames.csv')
n_classes = signnames.shape[0]
signnames = pd.DataFrame(signnames)
signnames = dict(zip(list(signnames.ClassId), list(signnames.SignName)))
#displaying summary
print("Number of training examples =\t", n_train)
print("Number of testing examples =\t", n_test)
print("Number of validation examples =\t", n_valid)
print("Image data shape =\t\t", image_shape)
print("Number of classes =\t\t", n_classes)
## number of counts per class
classes, counts = np.unique(y_train, return_counts=True)
classes_count = dict(zip(classes, counts))
#plotting and saving bar chart
fig = plt.figure(figsize=(20,5))
plt.title('Training classes distribution')
plt.xticks(list(signnames.keys()), signnames.values(),rotation = 'vertical')
plt.bar(classes,counts)
plt.show()
fig.savefig(images_dir + 'training_distribution.png')
%matplotlib inline
def plotGrid(sample_set, labels, signnames, name, columns=5, cutoff = 5):
"""Function to plot the traffic signs in a grid with the corresponding title in the left side,
all traffic signs are first sorted and randomly selected to fill the columns,the figure is then saved.
"""
assert len(sample_set) == len(labels)
input_set = sample_set[labels.argsort()]
classes, counts = np.unique(labels, return_counts=True)
indexes = np.append([0],np.cumsum(counts))
figsize = (3*columns,3*(min(len(classes),cutoff)))
fig, axes = plt.subplots(min(len(classes),cutoff), columns,figsize=figsize)
fig.subplots_adjust(hspace=0, wspace=0)
for i, row in enumerate(axes):
index = list(range(indexes[i], indexes[i+1]))
random.shuffle(index)
for j,ax in enumerate(row):
if (len(index)>0):
ind = index.pop()
if input_set[ind].shape[2]<3:
ax.imshow(input_set[ind][:,:,0], cmap='gray')
else:
ax.imshow(input_set[ind])
if j == 0:
ax.set_ylabel(textwrap.fill(signnames[classes[i]],15),rotation=0, fontsize=20, horizontalalignment = 'right')
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
fig.savefig(images_dir+name+'.png')
plotGrid(X_train,y_train,signnames,'raw_visualization',8,10)
def transform_augmentation(input, args):
"""Make random transformations to the input image, using random shears and rotator factors"""
shear = np.random.normal(0,.1)
rotator = np.random.normal(0,.1)
affine_tf = trans.AffineTransform(shear = shear, rotation = rotator)
modified = trans.warp(input,affine_tf, mode = args['mode'])
output = img_as_ubyte(modified)
return [output]
#def scatterpoints(input, sigma = 5):
# """scatter points within a circle of radius sigma"""
# output = np.empty((0,2),dtype= 'uint8')
# for i in input:
# output = np.append(output,[[int(random.gauss(i[0],sigma)),int(random.gauss(i[1],sigma))]],axis = 0)
# return output.astype(np.float32)
def perspective_augmentation(input, args):
"""make random perspective changes, in the horizontal or vertical axis"""
m = input.shape[0]
d = random.choice([random.randint(-8,-2),random.randint(2,8)])
src = np.float32([[0,0],[0,m],[m,0],[m,m]])
vertical = np.float32([[d,0],[-d,m],[m-d,0],[m+d,m]])
horizontal = np.float32([[0,d],[0,m-d],[m,-d],[m,m+d]])
dst = random.choice([vertical,horizontal])
tform = cv2.getPerspectiveTransform(src,dst)
modified = cv2.warpPerspective(input, tform, (32,32), borderMode = cv2.BORDER_REPLICATE)
output = img_as_ubyte(modified)
return [output]
def destroy_augmentation(input, args):
"Changes random pixels to the median values"
output = input.copy()
x = np.random.randint(0,input.shape[0], args['destroy_intensity'])
y = np.random.randint(0,input.shape[1], args['destroy_intensity'])
for j in range(args['destroy_intensity']):
output[x[j]][y[j]] = np.random.normal(np.median(input),1)
return [output]
def enhance_augmentation(input, args):
"Randomly change Brightness or Contrast of the input image"
image = Image.fromarray(input)
func = random.choice(args['enhance_functions'])
enhancer = func(image)
factor = random.uniform(0.6,2)
output = np.array(enhancer.enhance(factor),dtype = 'uint8')
return [output]
def flip_augmentation(input, args):
"""Check if the input image is flippable and then flip it either vertically,
horizontally, or horizontally and then vertically"""
label = args['label']
flippable = {'horizontal' : [11, 12, 13, 15, 17, 18, 22, 26, 30, 35], # Horizontal flippable signs
'vertical' : [1, 5, 12, 15, 17], #vertical flippable signs
'both' : [32, 40]} #horizontal then vertical flippable signs
output = np.empty((0,)+input.shape[0:], dtype = input.dtype)
if label in flippable['horizontal']:
output = np.append(output,[np.fliplr(input)],axis = 0) #perfom horizontal flip
if label in flippable['vertical']:
output = np.append(output,[np.flipud(input)], axis = 0) #perfom vertical flip
if label in flippable['both']:
hf = np.fliplr(input) #perfom horizontal flip then
vf = np.flipud(hf) #perfom vertical flip
output = np.append(output,[vf], axis = 0)
return output
#def get_required_augdata(classes_count):
# """get the quantity of data required to fulfill each set, so the set is equally balanced """
# output = classes_count.copy()
# max_count = np.max(list(classes_count.values()))
# for i in classes_count:
# output[i] = max_count - classes_count[i]
# return output
def augment_data(sample_set, labels_set, expansion_size, functions,args):
"""randomly generates new data and append it to the existing one, using random functions of augmentation"""
sample_set = sample_set[labels_set.argsort()]
classes, counts = np.unique(labels_set, return_counts=True)
indexes = np.append([0],np.cumsum(counts))
samples = sample_set.copy()
labels = np.sort(labels_set.copy())
for i in tqdm(range(n_classes)):
new_data = np.empty((0,)+sample_set.shape[1:], dtype = sample_set.dtype)
while new_data.shape[0]<expansion_size:
index = random.randint(indexes[i],indexes[i+1]-1)
args['label'] = i
temp_data = random.choice(functions)(sample_set[index],args)
new_data = np.append(new_data,temp_data,axis=0)
samples = np.append(samples,new_data,axis=0)
labels = np.append(labels, [i]*new_data.shape[0])
del new_data
gc.collect()
return [samples,labels]
def balance_data(sample_set, labels_set, cutoff=2000):
"""Randomly select data from a set in order to get a equal size balanced data."""
sample_set = sample_set[labels_set.argsort()]
classes, counts = np.unique(labels_set, return_counts=True)
indexes = np.append([0],np.cumsum(counts))
x_balanced = np.empty((0,)+sample_set.shape[1:], dtype = sample_set.dtype)
y_balanced = np.empty((0))
for i in tqdm(range(n_classes)):
idxs = np.random.randint(indexes[i],indexes[i+1]-1,cutoff)
temp_data = sample_set[idxs]
x_balanced = np.append(x_balanced,temp_data,axis=0)
y_balanced = np.append(y_balanced,[i]*temp_data.shape[0])
return {'x':x_balanced,'y': y_balanced}
i = 3497
original = X_train[i]
fig, axes = plt.subplots(5,3, figsize = (20,20))
fig.subplots_adjust(hspace=0, wspace=0)
augmentations = [destroy_augmentation,transform_augmentation,flip_augmentation,perspective_augmentation,enhance_augmentation]
aug_labels = ['Destroy augmentation','Transform augmentation','Flip augmentation','Perspective augmentation','Enhance augmentation']
args = {'mode':'edge',
'destroy_intensity':150,
'label':1,
'enhance_functions':[ImageEnhance.Contrast,
ImageEnhance.Brightness]}
axes[0][0].set_title('Original', fontsize=20)
axes[0][1].set_title('Random Generated picture', fontsize=20)
axes[0][2].set_title('Random Generated picture', fontsize=20)
for i, row in enumerate(axes):
for j,ax in enumerate(row):
ax.get_xaxis().set_ticks([])
ax.get_yaxis().set_ticks([])
if j % 5 == 0:
ax.imshow(original)
ax.set_ylabel(textwrap.fill(aug_labels[i],15),rotation=0, fontsize=20, horizontalalignment = 'right')
else:
ax.imshow(augmentations[i](original,args)[0])
plt.show()
fig.savefig(images_dir+'augmentations_examples.png')
force_data_augmentation = True
#functions use to augment data
functions=[destroy_augmentation,transform_augmentation,flip_augmentation,perspective_augmentation,enhance_augmentation]
#parameters use to augmetn data
args = {'mode':'edge',
'destroy_intensity':300,
'enhance_functions':[ImageEnhance.Contrast,
ImageEnhance.Brightness]}
def augmentation_process(extension=4000,cutoff=4000):
"""perform the augmentation process follow by a balance process"""
x,y = augment_data(X_train,y_train, extension,functions,args)
new_data = balance_data(x,y,cutoff)
pickle.dump(new_data, open('new_data.p','wb'))
return new_data
if force_data_augmentation:
new_data = augmentation_process()
else:
try:
new_data = pickle.load(open('new_data.p','rb'))
except(OSError, IOError) as e:
new_data = augmentation_process()
aug_classes, aug_counts = np.unique(new_data['y'], return_counts=True)
classes_count_aug = dict(zip(aug_classes, aug_counts))
fig = plt.figure(figsize=(20,5))
plt.title('Balanced training classes distribution')
plt.xticks(list(signnames.keys()), signnames.values(),rotation = 'vertical')
plt.bar(classes,aug_counts, label="with augmentation")
plt.bar(classes,counts,label="without augmentation")
plt.legend()
plt.show()
fig.savefig(images_dir + 'balanced_training_distribution.png')
plotGrid(new_data['x'],new_data['y'],signnames,'augmented_data',8,43)
bypass_augmentation = False
if bypass_augmentation:
X_train_input = X_train
y_train_input = y_train
n_train = n_train
else:
X_train_input = new_data['x']
y_train_input = new_data['y']
n_train = X_train_input.shape[0]
print("New Number of training examples =\t", n_train)
def normalization(input,l=0.1,u=0.9):
"squeezes the data between the range given"
output = np.zeros_like(input, dtype = np.float)
c=(u-l)
r = input[:,:,0]
g = input[:,:,1]
b = input[:,:,2]
max_array = np.full(r.shape,u)
r = l + (r-np.min(r))/(np.max(r)-np.min(r))*(c)
g = l + (g-np.min(g))/(np.max(g)-np.min(g))*(c)
b = l + (b-np.min(b))/(np.max(b)-np.min(b))*(c)
#if a zero division is encountered the nan value is replace by the upper value (u)
output[:,:,0] = np.where(np.isnan(r),max_array,r)
output[:,:,1] = np.where(np.isnan(g),max_array,g)
output[:,:,2] = np.where(np.isnan(b),max_array,b)
return output
def histogram_equalization(input):
"""performs a dynamic histogram equalization in the intensity
channel from the HSV color map"""
output = np.zeros_like(input,dtype=np.float)
hsv = cv2.cvtColor(input, cv2.COLOR_RGB2HSV)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
hsv[:,:,2] = clahe.apply(hsv[:,:,2])
output = hsv
return output
def grayScale(input):
"""converts the image to grayscale averaging each channel"""
r, g, b = input[:,:,0], input[:,:,1], input[:,:,2]
output = 0.3 * r + 0.3 * g + 0.3 * b
return output
def highpass(input,kernel = np.array([[-1, -1, -1],[-1, 8, -1], [-1, -1, -1]]), scale= 10.0, offset = 128):
"""performs a convolution with and then an offset is summed"""
output = np.zeros_like(input,dtype=np.float64)
r = ndimage.convolve(input[:,:,0], kernel/scale)+offset
g = ndimage.convolve(input[:,:,1], kernel/scale)+offset
b = ndimage.convolve(input[:,:,2], kernel/scale)+offset
output[:,:,0] = r
output[:,:,1] = g
output[:,:,2] = b
return output
def add(A,B):
"""Add two images"""
output = np.minimum(A+B,255)
return output.astype('uint8')
def linearBurn(A,B):
"""Add two images and the maximum type value for 'uint8' is subtracted"""
output = np.maximum(0,((A+B)-[255]))
return output.astype('uint8')
def enhance(input,iteration = 1,
kernel = np.array([[-1, -1, -1],
[-1, 8, -1],
[-1, -1, -1]])):
"""performs a highpass, from which a linear dodge and a linear burn is executed,
then weights are calculated from the normalization between 0 and 1
the final image is the weighted sum fo the linear burn and linear dodge images.
the process can be run with more iterations.
This funtion willl return a similar image with sharper edges, brighter high value pixels and darker low value pixels"""
for i in range(iteration):
h_pass = highpass(input)
linear_dodge = add(h_pass,input)
linear_burn = linearBurn(h_pass,input)
weights = normalization(input,0,1)
upper = np.multiply((weights),linear_dodge)
lower = np.multiply(1-(weights),linear_burn)
output = (upper+lower).astype('uint8')
return output
def plot_luminance_histogram(input,axe,bins = 15):
luminance = 1/3*(input[:,:,0]+input[:,:,1]+input[:,:,2])
axe.hist(luminance.flatten(), bins = bins, range = (np.min(luminance),np.max(luminance)))
def dividePicture(input, div):
"""divide the picture in a grid, returns a list of each cell"""
assert(div in [1,4,16,64])
output = []
blockWidth = int(np.sqrt(div))
subArrays = np.split(input, blockWidth)
for sa in subArrays:
output.extend(np.split(sa, blockWidth, axis = 1))
return output
def getColorInfo(input, divisions = 1, colors = 6):
"""gets a histogram for the hue channel of each celll of a divided hsv picture, then appends the result in a single array"""
output = []
colorChannel = input[:,:,0]
divisions = dividePicture(colorChannel,divisions)
for sub in divisions:
hist, _ = np.histogram(sub,bins = colors)
output.extend(hist/len(sub.flatten()))
return output
fig, axes = plt.subplots(4,2,figsize=(10,10))
fig.subplots_adjust(hspace=0.5, wspace=0.5)
#i = random.randint(0,n_classes)
i = 10000
axes[0][0].imshow(X_train_input[i])
axes[0][0].set_title('Original')
plot_luminance_histogram(X_train_input[i],axes[0][1])
enhanced = enhance(X_train_input[i],1)
axes[1][0].imshow(enhanced)
axes[1][0].set_title('Enhance')
plot_luminance_histogram(enhanced,axes[1][1] )
equa = histogram_equalization(enhanced)
axes[2][0].imshow(equa[:,:,2], cmap = 'gray')
axes[2][0].set_title('Adaptive histogram equalization')
plot_luminance_histogram(equa,axes[2][1])
norm = normalization(equa)
axes[3][0].imshow(norm[:,:,2], cmap = 'gray')
axes[3][0].set_title('Normalization')
plot_luminance_histogram(norm,axes[3][1])
plt.show()
divisions = 16
colors = 6
force_preprocess = True
params = {'divisions':divisions,
'colors':colors,
'enhance_iterations':1}
X_train_preprocessed = np.empty(X_train_input.shape[0:3]+(1,),dtype = np.float)
X_train_color_info = np.empty((X_train_input.shape[0],)+(divisions*colors,),dtype = np.float)
X_test_preprocessed = np.empty(X_test.shape[0:3]+(1,),dtype = np.float)
X_test_color_info = np.empty((X_test.shape[0],)+(divisions*colors,),dtype = np.float)
X_valid_preprocessed = np.empty(X_valid.shape[0:3]+(1,),dtype = np.float)
X_valid_color_info = np.empty((X_valid.shape[0],)+(divisions*colors,),dtype = np.float)
def preProcess(input, params):
p = input.copy()
p = enhance(p,params['enhance_iterations'])
p = histogram_equalization(p)
p = normalization(p)
colorInfo = getColorInfo(p,params['divisions'],params['colors'])
output = p[:,:,2].reshape(32,32,1)
return output, colorInfo
def preProcessAndSave():
for i in tqdm(range(n_train)):
X_train_preprocessed[i], X_train_color_info[i] = preProcess(X_train_input[i],params)
for i in tqdm(range(n_test)):
X_test_preprocessed[i], X_test_color_info[i] = preProcess(X_test[i],params)
for i in tqdm(range(n_valid)):
X_valid_preprocessed[i], X_valid_color_info[i] = preProcess(X_valid[i],params)
pickle.dump(X_train_preprocessed, open('X_train_preprocessed.p','wb'))
pickle.dump(X_test_preprocessed, open('X_test_preprocessed.p','wb'))
pickle.dump(X_valid_preprocessed, open('X_valid_preprocessed.p','wb'))
pickle.dump(X_train_color_info, open('X_train_color_info.p','wb'))
pickle.dump(X_test_color_info, open('X_test_color_info.p','wb'))
pickle.dump(X_valid_color_info, open('X_valid_color_info.p','wb'))
if force_preprocess:
preProcessAndSave()
else:
try:
X_train_preprocessed = pickle.load(open('X_train_preprocessed.p','rb'))
X_test_preprocessed = pickle.load(open('X_test_preprocessed.p','rb'))
X_valid_preprocessed = pickle.load(open('X_valid_preprocessed.p','rb'))
X_train_color_info = pickle.load(open('X_train_color_info.p','rb'))
X_test_color_info = pickle.load(open('X_test_color_info.p','rb'))
X_valid_color_info = pickle.load(open('X_valid_color_info.p','rb'))
except(OSError, IOError) as e:
preProcessAndSave()
plotGrid(X_train_preprocessed, y_train_input, signnames,'Preprocessed_visualization',10,5)
##Check is Gpu and tensor flow is enabled
from tensorflow.python.client import device_lib
def get_available_gpus():
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos if x.device_type == 'GPU']
print(tf.__version__)
print(get_available_gpus())
save_file = 'checkpoints/best_validation1.ckpt'
mu = 0;
sigma = 0.1;
EPOCHS = 64
BATCH_SIZE = 150
learning_rate = 1e-3
beta = 0.01
### Define your architecture here.
### Feel free to use as many code cells as needed.
def maxpool2d(x, k=2):
return tf.nn.max_pool(x,
ksize=[1, k, k, 1],
strides=[1, k, k, 1],
padding='SAME')
def model(x,c, kp1, kp2, kp3, kp4,weights,biases, n_classes):
# Layer one
conv1 = tf.nn.conv2d(x, weights['w1'], [1,1,1,1],'SAME')
print('expected: 32x32x32, reality: '+ str(conv1.get_shape()))
conv1 = tf.nn.bias_add(conv1,biases['b1'])
conv1 = tf.nn.relu(conv1, name= 'conv1')
out_1 = conv1;
conv1 = maxpool2d(conv1,2)
conv1 = tf.nn.dropout(conv1, kp1)
print('expected: ?x16x16x32, reality: '+ str(conv1.get_shape()))
# Layer two
conv2 = tf.nn.conv2d(conv1, weights['w2'], [1,1,1,1],'SAME')
print('expected: ?x16x16x64, reality: '+ str(conv2.get_shape()))
conv2 = tf.nn.bias_add(conv2,biases['b2'])
conv2 = tf.nn.relu(conv2, name= 'conv2')
out_2 = conv2
conv2 = maxpool2d(conv2,2)
conv2 = tf.nn.dropout(conv2, kp2)
print('expected: ?x8x8x64, reality: '+ str(conv2.get_shape()))
# Layer three
conv3 = tf.nn.conv2d(conv2, weights['w3'], [1,1,1,1],'SAME')
print('expected: ?x8x8x128, reality: '+ str(conv3.get_shape()))
conv3 = tf.nn.bias_add(conv3,biases['b3'])
conv3 = tf.nn.relu(conv3, name= 'conv3')
out_3 = conv3
conv3 = maxpool2d(conv3,2)
conv3 = tf.nn.dropout(conv3, kp3)
print('expected: ?x4x4x128, reality: '+ str(conv3.get_shape()))
# Flatten layer
conv1 = maxpool2d(conv1,4)
conv1flat = tf.contrib.layers.flatten(conv1)
print('expected: ?x?, reality: '+ str(conv1flat.get_shape()))
conv2 = maxpool2d(conv2,2)
conv2flat = tf.contrib.layers.flatten(conv2)
print('expected: ?x?, reality: '+ str(conv2flat.get_shape()))
conv3flat = tf.contrib.layers.flatten(conv3)
print('expected: ?x?, reality: '+ str(conv3flat.get_shape()))
fcinput = tf.concat(1,[conv1flat, conv2flat, conv3flat,c])
# Fully connected layer 1
fc1 = tf.nn.bias_add(tf.matmul(fcinput,weights['w4']),biases['b4'])
fc1 = tf.nn.relu(fc1, name= 'fc1')
out_4 = fc1
fc1 = tf.nn.dropout(fc1, kp4)
# output layer
logits = tf.nn.bias_add(tf.matmul(fc1,weights['w5']),biases['b5'])
return logits, out_1, out_2, out_3, out_4
A validation set can be used to assess how well the model is performing. A low accuracy on the training and validation sets imply underfitting. A high accuracy on the training set but low accuracy on the validation set implies overfitting.
### Train your model here.
### Calculate and report the accuracy on the training and validation set.
### Once a final model architecture is selected,
### the accuracy on the test set should be calculated and reported as well.
### Feel free to use as many code cells as needed.
weights = {'w1': tf.Variable(tf.truncated_normal([5,5,1,32],mu,sigma)),
'w2':tf.Variable(tf.truncated_normal([5,5,32,64],mu,sigma)),
'w3':tf.Variable(tf.truncated_normal([5,5,64,128],mu,sigma)),
'w4':tf.Variable(tf.truncated_normal([(3584)+divisions*colors,1024],mu,sigma)),
'w5':tf.Variable(tf.truncated_normal([1024,n_classes],mu,sigma))}
biases = {'b1': tf.Variable(tf.zeros([32])),
'b2': tf.Variable(tf.zeros([64])),
'b3': tf.Variable(tf.zeros([128])),
'b4': tf.Variable(tf.zeros([1024])),
'b5': tf.Variable(tf.zeros([n_classes])),}
keep_prob = { 'k1': .9,
'k2': .8,
'k3': .5,
'k4': .1}
x = tf.placeholder(tf.float32, (None, 32, 32,1))
c = tf.placeholder(tf.float32, (None, divisions*colors))
y = tf.placeholder(tf.int32, (None))
keep_prob_layer1 = tf.placeholder(tf.float32)
keep_prob_layer2 = tf.placeholder(tf.float32)
keep_prob_layer3 = tf.placeholder(tf.float32)
keep_prob_layer4 = tf.placeholder(tf.float32)
one_hot_y = tf.one_hot(y,n_classes)
logits, out_1, out_2, out_3, out_4 = model(x,
c,
keep_prob_layer1,
keep_prob_layer2,
keep_prob_layer3,
keep_prob_layer4,
weights,
biases,
n_classes)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=one_hot_y)
loss_operation = tf.reduce_mean(cross_entropy)
regularizers = tf.nn.l2_loss(weights['w4']) + tf.nn.l2_loss(weights['w5'])
loss_operation = tf.reduce_mean(loss_operation + beta*regularizers)
optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
training_operation = optimizer.minimize(loss_operation)
saver = tf.train.Saver()
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
def evaluate(X_data, c_data, y_data):
num_examples = len(X_data)
total_accuracy = 0
sess = tf.get_default_session()
for offset in range(0, num_examples, BATCH_SIZE):
batch_x, batch_c, batch_y= X_data[offset:offset+BATCH_SIZE], c_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x,
c: batch_c,
y: batch_y,
keep_prob_layer1: 1,
keep_prob_layer2: 1,
keep_prob_layer3: 1,
keep_prob_layer4: 1
})
total_accuracy += (accuracy * len(batch_x))
return total_accuracy / num_examples
tac = []
vac = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
num_examples = n_train
print("num examples: {}".format(num_examples))
print("training...")
print()
bestValidation = 0
for i in range(EPOCHS):
X_train_preprocessed, X_train_color_info, y_train_input = shuffle(X_train_preprocessed,
X_train_color_info,
y_train_input)
for offset in range(0, num_examples, BATCH_SIZE):
end = offset + BATCH_SIZE
batch_x, batch_c, batch_y = X_train_preprocessed[offset:end], X_train_color_info[offset:end], y_train_input[offset:end]
sess.run(training_operation, feed_dict={x: batch_x,
c: batch_c,
y: batch_y,
keep_prob_layer1: keep_prob['k1'],
keep_prob_layer2: keep_prob['k2'],
keep_prob_layer3: keep_prob['k3'],
keep_prob_layer4: keep_prob['k4']})
training_accuracy = evaluate(X_train_preprocessed, X_train_color_info, y_train_input)
validation_accuracy = evaluate(X_valid_preprocessed, X_valid_color_info, y_valid)
tac.append(training_accuracy)
vac.append(validation_accuracy)
if validation_accuracy>bestValidation:
try:
saver
except nameError:
saver = tf.train.Saver()
saver.save(sess,save_file)
print("Model Saved")
bestValidation = validation_accuracy
print("EPOCH {} ...".format(i+1))
print("training acurracy = {:.3f}".format(training_accuracy))
print("Validation acurracy = {:.3f}".format(validation_accuracy))
print()
fig = plt.figure()
epochs = range(len(tac))
plt.plot(epochs,tac, label = 'Training')
plt.plot(epochs,vac, label = 'validation')
plt.legend()
plt.title('Learning Curve')
plt.xlabel('EPOCHS')
plt.ylabel('Accuracy')
plt.show()
with tf.Session() as sess:
saver.restore(sess, save_file)
print('Model restored.')
test_accuracy = sess.run(accuracy_operation, feed_dict={x: X_test_preprocessed,
c: X_test_color_info,
y: y_test,
keep_prob_layer1: 1,
keep_prob_layer2: 1,
keep_prob_layer3: 1,
keep_prob_layer4: 1})
print('Test Accuracy: {}'.format(test_accuracy))
To give yourself more insight into how your model is working, download at least five pictures of German traffic signs from the web and use your model to predict the traffic sign type.
You may find signnames.csv useful as it contains mappings from the class id (integer) to the actual sign name.
untermainbrücke, mainkai, frankfurt
images_paths = sorted(glob.glob("cropped_traffic_signs/*"))
X_internet = np.empty((0,32,32,3),dtype= 'uint8')
X_labels = np.array([13, 12,13,17,1,25,33,15,15,17,17,1,24,14,14,25,14,35,17,17,
15,12,13,35,13,33,18,13,25,25,38,17,23,12,1,1,28,28,1,36])
for i, img_path in enumerate(images_paths):
img = Image.open(img_path)
resized = img.resize((32,32), Image.ANTIALIAS)
X_internet = np.append(X_internet,[np.array(resized)[:,:,:3]], axis = 0)
plotGrid(X_internet,X_labels,signnames, 'internet_images_raw', 6, 43)
n_internet_images = X_internet.shape[0]
n_internet_labels = []
X_internet_preprocessed = np.empty(X_internet.shape[0:3]+(1,),dtype = np.float)
c_internet = np.empty((X_internet.shape[0],)+(divisions*colors,),dtype = np.float)
for i in tqdm(range(n_internet_images)):
X_internet_preprocessed[i], c_internet[i] = preProcess(X_internet[i],params)
plotGrid(X_internet_preprocessed,X_labels,signnames,'internet_images_preprocessed', 6, 43)
### Run the predictions here and use the model to output the prediction for each image.
### Make sure to pre-process the images with the same pre-processing pipeline used earlier.
### Feel free to use as many code cells as needed.
with tf.Session() as sess:
saver.restore(sess, save_file)
predicted_classes = sess.run(tf.nn.softmax(logits), feed_dict={x: X_internet_preprocessed,
c: c_internet,
keep_prob_layer1: 1,
keep_prob_layer2: 1,
keep_prob_layer3: 1,
keep_prob_layer4: 1})
best = sess.run(tf.nn.top_k(predicted_classes, k=5))
### Calculate the accuracy for these 5 new images.
### For example, if the model predicted 1 out of 5 signs correctly, it's 20% accurate on these new images.
correct = 0.0
for i, prediction in enumerate(best.indices):
if(prediction[0]==X_labels[i]):
correct+=1
internet_test_accuracy = correct/n_internet_images
print('Internet Test Accuracy: {}'.format(internet_test_accuracy) )
### Print out the top five softmax probabilities for the predictions on the German traffic sign images found on the web.
### Feel free to use as many code cells as needed.
for ind,img in enumerate(X_internet):
fig, axes = plt.subplots(1, 2,figsize=(20,2))
fig.subplots_adjust(wspace=0.3)
axes[0].imshow(img)
axes[1].barh([0,1,2,3,4],best.values[ind])
labels = [signnames[x] for x in best.indices[ind]]
plt.yticks([0,1,2,3,4], labels)
plt.show()
fig.savefig(images_dir + 'predictions.png')
### Visualize your network's feature maps here.
### Feel free to use as many code cells as needed.
# image_input: the test image being fed into the network to produce the feature maps
# tf_activation: should be a tf variable name used during your training procedure that represents the calculated state of a specific weight layer
# activation_min/max: can be used to view the activation contrast in more detail, by default matplot sets min and max to the actual min and max values of the output
# plt_num: used to plot out multiple different weight feature map sets on the same block, just extend the plt number for each new feature map entry
def outputFeatureMap(image_input,color_input, tf_activation, activation_min=-1, activation_max=-1 ,plt_num=1):
# Here make sure to preprocess your image_input in a way your network expects
# with size, normalization, ect if needed
# Note: x should be the same name as your network's tensorflow data placeholder variable
# If you get an error tf_activation is not defined it maybe having trouble accessing the variable from inside a function
activation = tf_activation.eval(session=sess,feed_dict={x : image_input,
c : color_input,
keep_prob_layer1: 1,
keep_prob_layer2: 1,
keep_prob_layer3: 1,
keep_prob_layer4: 1})
featuremaps = activation.shape[3]
plt.figure(plt_num, figsize=(15,15))
for featuremap in range(featuremaps):
plt.subplot((featuremaps+1)//8,8, featuremap+1) # sets the number of feature maps to show on each row and column
plt.title('FeatureMap ' + str(featuremap)) # displays the feature map number
if activation_min != -1 & activation_max != -1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin =activation_min, vmax=activation_max, cmap="gray")
elif activation_max != -1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmax=activation_max, cmap="gray")
elif activation_min !=-1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin=activation_min, cmap="gray")
else:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", cmap="gray")
plt.show()
fig.savefig(images_dir + 'network_activations.png')
input_image = X_train_preprocessed[0:200]
input_color = X_train_color_info[0:200]
with tf.Session() as sess:
saver.restore(sess, save_file)
outputFeatureMap(input_image,input_color, out_1)